Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3 #34864

Closed
wants to merge 16 commits into from

Conversation

hiboyang
Copy link

@hiboyang hiboyang commented Dec 10, 2021

What changes were proposed in this pull request?

This PR (design doc) provides support to store shuffle files on external shuffle storage like S3. It helps Dynamic
Allocation on Kubernetes. Spark driver could release idle executors without worrying about losing
shuffle data because the shuffle data is store on external shuffle storage which are different
from executors.

This could be viewed as a followup work for https://issues.apache.org/jira/browse/SPARK-25299.

There is previously Worker Decommission feature (SPARK-33545), which is a great feature to copy shuffle data to fallback storage like S3. People appreciate that work to address the critical issue to handle shuffle data on Spark executor termination. The work in the PR does not intent to replace that feature. The intent is to get further discussion about how to save shuffle data on S3 during normal execution time.

Why are the changes needed?

To better support Dynamic Allocation on Kubernetes, we need to decouple shuffle data from Spark
executor. This PR implements another Shuffle Manager and support writing shuffle data on S3.

Does this PR introduce any user-facing change?

Yes, this PR adds two Spark config like following to plug in another StarShuffleManager and store
shuffle data on provided S3 location.

spark.shuffle.manager=org.apache.spark.shuffle.StarShuffleManager
spark.shuffle.star.rootDir=s3://my_bucket_name/my_shuffle_folder

How was this patch tested?

Added a unit test for StartShuffleManager. A lot of classes are copied from Spark, thus not add tests
for those classes. We will work with the community to get feedback first, then work on removing code
copy/duplication.

@SparkQA
Copy link

SparkQA commented Dec 10, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50556/

@SparkQA
Copy link

SparkQA commented Dec 10, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50556/

@c21
Copy link
Contributor

c21 commented Dec 11, 2021

@hiboyang, thanks for the work here! Could you create a design doc for this? That might help get more people attention and easier for them to understand.

@hiboyang
Copy link
Author

@hiboyang, thanks for the work here! Could you create a design doc for this? That might help get more people attention and easier for them to understand.

Yes, good suggestion. Will create some design doc.

@SparkQA
Copy link

SparkQA commented Dec 11, 2021

Test build #146081 has finished for PR 34864 at commit 4b67c8a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class ByteBufUtils
  • public class StarBlockStoreClient extends BlockStoreClient
  • public class StarBypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V>
  • public class StarLocalFileShuffleFileManager implements StarShuffleFileManager
  • public class StarMapResultFileInfo
  • public class StarS3ShuffleFileManager implements StarShuffleFileManager
  • public static class S3BucketAndKey
  • public class StarUtils
  • public class StartFileSegmentWriter
  • class StarShuffleManager(conf: SparkConf) extends ShuffleManager with Logging
  • final class StarShuffleBlockFetcherIterator(
  • case class FetchRequest(
  • case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends FetchResult

@linzebing
Copy link
Contributor

Quickly glanced through the code, seems for writing shuffle data we are writing locally first and then upload to S3, similarly for reading shuffle data we are downloading data to a local temp file first and then read.

We should be able to write/read direct to/from S3, right?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, All. Thank you!

BTW, for the record, Apache Spark 3.1+ already stores its shuffle files into the external storage like S3 and reads back from it.

  • [SPARK-33545][CORE] Support Fallback Storage during Worker decommission (Apache Spark 3.1.0)
  • [SPARK-34142][CORE] Support Fallback Storage Cleanup during stopping SparkContext (Apache Spark 3.2.0)
  • [SPARK-37509][CORE] Improve Fallback Storage upload speed by avoiding S3 rate limiter (Apache Spark 3.3.0)

It would be great not to ignore the existing Spark feature and avoid over-claiming.

Dynamic allocation is the same. Apache Spark has been supporting Dynamic Allocation in K8s too.

@hiboyang
Copy link
Author

Quickly glanced through the code, seems for writing shuffle data we are writing locally first and then upload to S3, similarly for reading shuffle data we are downloading data to a local temp file first and then read.

We should be able to write/read direct to/from S3, right?

Thanks for looking! Yes, we should be able to write/read direct on S3. This PR is a prototype. Still need to improve the code and performance of writing/reading shuffle data on S3.

@hiboyang
Copy link
Author

Hi, All. Thank you!

BTW, for the record, Apache Spark 3.1+ already stores its shuffle files into the external storage like S3 and reads back from it.

  • [SPARK-33545][CORE] Support Fallback Storage during Worker decommission (Apache Spark 3.1.0)
  • [SPARK-34142][CORE] Support Fallback Storage Cleanup during stopping SparkContext (Apache Spark 3.2.0)
  • [SPARK-37509][CORE] Improve Fallback Storage upload speed by avoiding S3 rate limiter (Apache Spark 3.3.0)

It would be great not to ignore the existing Spark feature and avoid over-claiming.

Dynamic allocation is the same. Apache Spark has been supporting Dynamic Allocation in K8s too.

Right, Spark has shuffle tracking to support Dynamic Allocation on Kubernetes, but it will not work well when there is shuffle data distributed on many executors (those executors cannot be released).

The work here (storing shuffle data on S3) does not conflict with worker decommission feature. The eventual goal is to store shuffle data on S3 or other external storage directly. Before getting there, people could still use the worker decommission feature.

@dongjoon-hyun
Copy link
Member

You are completely wrong because you already know the worker decommission feature.

but it will not work well when there is shuffle data distributed on many executors (those executors cannot be released).

You should mention this in the PR description explicitly instead of misleading the users.

The work here (storing shuffle data on S3) does not conflict with worker decommission feature. The eventual goal is to store shuffle data on S3 or other external storage directly.

@hiboyang
Copy link
Author

Hi Dongjoon, you got some misunderstandings here. I am writing a design doc for this PR. Hope that will help you to understand more and address your questions.

You are completely wrong because you already know the worker decommission feature.

but it will not work well when there is shuffle data distributed on many executors (those executors cannot be released).

You should mention this in the PR description explicitly instead of misleading the users.

The work here (storing shuffle data on S3) does not conflict with worker decommission feature. The eventual goal is to store shuffle data on S3 or other external storage directly.

@SparkQA
Copy link

SparkQA commented Dec 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50620/

@SparkQA
Copy link

SparkQA commented Dec 13, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50620/

@SparkQA
Copy link

SparkQA commented Dec 14, 2021

Test build #146147 has finished for PR 34864 at commit 8222f38.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50775/

@SparkQA
Copy link

SparkQA commented Dec 17, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50775/

@SparkQA
Copy link

SparkQA commented Dec 17, 2021

Test build #146303 has finished for PR 34864 at commit 761fe2a.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hiboyang
Copy link
Author

Add a design doc for this prototype.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some quick comments on the pr

<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should pull in spark-hadoop-cloud and so indirectly get its shaded full aws sdk. yes, it's big, but iat guarantees that it has a consistent set of its own dependencies (http client, jackson etc) and because it includes support for services like STS and s3 events, lets you add new features with guaranteed consistency of aws artifacts.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Yes, I was thinking to use that hadoop library as well, then did not do it due to wanting to start small with this prototype. It sounds a good idea to switch to hadoop library.

ManagedBuffer managedBuffer = downloadFileWritableChannel.closeAndRead();
listener.onBlockFetchSuccess(blockIdStr, managedBuffer);
} catch (IOException e) {
throw new RuntimeException(String.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include the inner exception text in the message and supply the exception as the inner exception in the constructor

NioManagedBuffer managedBuffer = new NioManagedBuffer(byteBuffer);
listener.onBlockFetchSuccess(blockIdStr, managedBuffer);
} catch (IOException e) {
throw new RuntimeException(String.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, pass on inner exception details.

Copy link
Author

@hiboyang hiboyang Jan 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch! Will add inner exception!

import scala.collection.Iterator;

import javax.annotation.Nullable;
import java.io.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit brittle against jvm releases adding new classes here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me remove .* here.

public static final String DEFAULT_AWS_REGION = Regions.US_WEST_2.getName();

private static TransferManager transferManager;
private static Object transferManagerLock = new Object();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

throw new RuntimeException(String.format(
"Failed to download shuffle file %s", s3Url));
} finally {
transferManager.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the transfer manager is reused?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a code mistake. I should not shutdown shuffle manager here. Will remove this.

@steveloughran
Copy link
Contributor

Obviously I am biased, but I believe that rather than trying to use the AWS APIs yourself, you should just use the hadoop file system APIs and interact with S3 through the s3a connector.

For a high-performance upload of a local file, use FileSystem.copyFromLocalFile in s3a on hadoop 3.3.2 this uses the same transfer manager class as this PR does but adds: exception handling/mapping, encryption settings, auditing. And the s3a integration tests verify all this works... By the time you get to use it here you can assume the S3 upload works, and it becomes a matter of linking it up to spark.

As copyFromLocalFile is implemented for all filesystems, it means the component will also work with other stores including google cloud and azure abfs, even if they do not override the base method for a high-performance implementation -yet.

This also means that you could write tests for the feature using file:// as the destination store and include these in the spark module; if you design such tests to be overrideable to work with other file systems, they could be picked up and reused as the actual integration test suites in an external module.

And, because someone else owns the problem of the s3 connector binding, you get to avoid fielding support calls related to configuring of AWS endpoint, region, support for third-party s3 stores, qualifying AWS SDK updates, etc.

Accordingly, I would propose

Getting integration tests set up is inevitably going to be somewhat complicated. I can provide a bit of consultation there.

@hiboyang
Copy link
Author

hiboyang commented Jan 5, 2022

Obviously I am biased, but I believe that rather than trying to use the AWS APIs yourself, you should just use the hadoop file system APIs and interact with S3 through the s3a connector.

For a high-performance upload of a local file, use FileSystem.copyFromLocalFile in s3a on hadoop 3.3.2 this uses the same transfer manager class as this PR does but adds: exception handling/mapping, encryption settings, auditing. And the s3a integration tests verify all this works... By the time you get to use it here you can assume the S3 upload works, and it becomes a matter of linking it up to spark.

As copyFromLocalFile is implemented for all filesystems, it means the component will also work with other stores including google cloud and azure abfs, even if they do not override the base method for a high-performance implementation -yet.

This also means that you could write tests for the feature using file:// as the destination store and include these in the spark module; if you design such tests to be overrideable to work with other file systems, they could be picked up and reused as the actual integration test suites in an external module.

And, because someone else owns the problem of the s3 connector binding, you get to avoid fielding support calls related to configuring of AWS endpoint, region, support for third-party s3 stores, qualifying AWS SDK updates, etc.

Accordingly, I would propose

Getting integration tests set up is inevitably going to be somewhat complicated. I can provide a bit of consultation there.

Yes, these are great suggestions! Thanks again! I will find time to make change for this, and may also reach out to your for consultation when adding integration test :)

@hiboyang hiboyang closed this Jan 5, 2022
@hiboyang hiboyang reopened this Jan 5, 2022
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Apr 16, 2022
@github-actions github-actions bot closed this Apr 17, 2022
@pspoerri
Copy link

pspoerri commented May 5, 2022

@hiboyang I looked at your work earlier this year and I wanted to let you know that I used it as a basis for a shuffle plugin. Ultimately I decided to rewrite the plugin from scratch (except the tests) and orient myself on the design of the Spark shuffle manager.

The code is available here: https://github.com/ibm/spark-s3-shuffle/ . It acts as an external Spark plugin and allows to be loaded into Spark binary releases as a plugin.

I'm open to contributing this work back to Apache Spark if there is any interest.

@dongjoon-hyun
Copy link
Member

Apache Spark community is open for any contribution, @pspoerri . You can make your own PR.
BTW, do you have any sharable result in terms of the stability and performance, @pspoerri ?

@hiboyang
Copy link
Author

hiboyang commented May 6, 2022

Hi @pspoerri great you are working on this and thanks for letting us know! I stopped working on my previous PR due to changing of work priority, but still would like to see people continue working in this area.

There is big value if storing Spark shuffle data on S3. It will save cost and also make Spark more resilient to disk error.

During my previous experiment, shuffle data on S3 will have very worse performance. There are a lot of optimization needed, e.g. S3 key prefix randomization to avoid S3 throttling and async S3 write. Will be happy to hear your thoughts on this as well.

@michaelbilow
Copy link

+1, would be great to get this working and part of the Spark ecosystem.

@steveloughran
Copy link
Contributor

@michaelbilow hadoop s3a is on v2 sdk; the com.amazonaws classes are not on the CP and amazon are slowly stopping support. you cannot for example use the lower latency S3 express stores with it.

Like I say: I think you would be better off using the Hue file system APIs to talk to s3. If there are aspects of s3 storage which aren't available through the API -or just very inefficiently due to the effort to preserve the Posix metaphor, then lets fix the API so that other stores can offer the same features, and other apps can pick up.

For example, here's our ongoing delete API for iceberg and other manifest-based tables
apache/hadoop#6726
It maps to s3 bulk delete calls, but there's scope to add to other stores (we now actually want to add it as a page-size == 1 option for all filesystems as it simplifies iceberg integration).

@pspoerri
Copy link

@steveloughran How do I call the Hue APIs from Spark? Can you point me to a package?
I agree with you that using the Hadoop APIs are not ideal performance wise, but they are great from a usability and portability perspective.

Another issue is that Hadoop wants to know the size of every file it wants to read. While this makes sense for formats like parquet where the header is located at the last few bytes of the file. It does not make sense for shuffle where you know the exact block/file you want to read.

@xleoken
Copy link
Member

xleoken commented Sep 7, 2024

+1, would be great to get this working and part of the Spark ecosystem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants